/*** Eclipse Class Decompiler plugin, copyright (c) 2012 Chao Chen (cnfree2000@hotmail.com) ***/
package com.sankuai.xm.kafka.client.factory;

import com.sankuai.xm.kafka.client.IConsumerProcessor;
import com.sankuai.xm.kafka.client.KafkaClient;
import com.sankuai.xm.kafka.client.exception.ConsumerConfigException;
import com.sankuai.xm.kafka.client.utils.StackTraceUtil;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumerBuildFactory extends KafkaBuildFactory {
	private static final Logger log = LoggerFactory
			.getLogger(KafkaConsumerBuildFactory.class);

	public static IConsumerProcessor init() throws Exception {
		Properties properties = KafkaBuildFactory.init0("consumer.properties");

		String topic = (String) properties.get("kafka.topic");
		if (topic == null) {
			log.error("xm-kafka-client, kafka.topic is null.");
			throw new ConsumerConfigException(
					"kafka Topic is null, can't init kafka client.");
		}

		String zookeeperConnect = (String) properties.get("zookeeper.connect");
		if (zookeeperConnect == null) {
			log.error("xm-kafka-client, zookeeper.connect is null.");
			throw new ConsumerConfigException(
					"kafka Zookeeper path is null, can't init kafka client.");
		}

		String groupId = (String) properties.get("group.id");
		if (groupId == null) {
			log.error("xm-kafka-client, kafka.group.id is null.");
			throw new ConsumerConfigException(
					"kafka.group.id is null, can't init kafka client.");
		}

		String threadCount = (String) properties.get("consumer.thread.num");
		if (threadCount == null) {
			log.error("xm-kafka-client, consumer.thread.num is null.");
			throw new ConsumerConfigException(
					"consumer.thread.num is null, can't init kafka client.");
		}

		Properties initProperties = new Properties();
		initProperties.put("kafka.topic", topic);
		initProperties.put("zookeeper.connect", zookeeperConnect);
		initProperties.put("group.id", groupId);
		initProperties.put("consumer.thread.num", threadCount);
		initProperties.put("zookeeper.session.timeout.ms", "5000");
		initProperties.put("zookeeper.sync.time.ms", "200");
		initProperties.put("auto.commit.interval.ms", "1000");
		IConsumerProcessor consumer;
		try {
			consumer = KafkaClient.buildConsumerFactory(initProperties);
		} catch (Exception e) {
			log.error(
					"xm-kafka-client, Kafka Consumer build Exception. exception : {}",
					StackTraceUtil.getStackTrace(e));
			throw new ConsumerConfigException(e);
		}
		return consumer;
	}

	public static IConsumerProcessor init(String topicKey, String groupIdKey,
			String threadNumKey, String zookeeperConnectKey) throws Exception {
		Properties properties = KafkaBuildFactory.init0("consumer.properties");

		String topic = (String) properties.get(topicKey);
		if (topic == null) {
			log.error("xm-kafka-client, {} is null.", topicKey);
			throw new ConsumerConfigException(
					"kafka Topic is null, can't init kafka client.");
		}

		String zookeeperConnect = (String) properties.get(zookeeperConnectKey);
		if (zookeeperConnect == null) {
			log.error("xm-kafka-client, {} is null.", zookeeperConnectKey);
			throw new ConsumerConfigException(
					"kafka Zookeeper path is null, can't init kafka client.");
		}

		String groupId = (String) properties.get(groupIdKey);
		if (groupId == null) {
			log.error("xm-kafka-client, {} is null.", groupIdKey);
			throw new ConsumerConfigException(
					"kafka.group.id is null, can't init kafka client.");
		}

		String threadCount = (String) properties.get(threadNumKey);
		if (threadCount == null) {
			log.error("xm-kafka-client, {} is null.", threadNumKey);
			throw new ConsumerConfigException(
					"consumer.thread.num is null, can't init kafka client.");
		}

		Properties initProperties = new Properties();
		initProperties.put("kafka.topic", topic);
		initProperties.put("zookeeper.connect", zookeeperConnect);
		initProperties.put("group.id", groupId);
		initProperties.put("consumer.thread.num", threadCount);
		initProperties.put("zookeeper.session.timeout.ms", "5000");
		initProperties.put("zookeeper.sync.time.ms", "200");
		initProperties.put("auto.commit.interval.ms", "1000");
		IConsumerProcessor consumer;
		try {
			consumer = KafkaClient.buildConsumerFactory(initProperties);
		} catch (Exception e) {
			log.error(
					"xm-kafka-client, Kafka Consumer build Exception. exception : {}",
					StackTraceUtil.getStackTrace(e));
			throw new ConsumerConfigException(e);
		}
		return consumer;
	}
}